package com;

import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

@SuppressWarnings("all")
public class ORCWriter611 {

    public static void main(String[] args) throws Exception {
        //定义ORC数据结构，即表结构
        TypeDescription schema = TypeDescription.createStruct();

        schema.addField("channel", TypeDescription.createString());
        schema.addField("time", TypeDescription.createString());
        schema.addField("params", TypeDescription.createString());

//        for (int i = 1; i < 15001; i++) {
//            schema.addField("p" + i, TypeDescription.createString());
//        }

        Gson gson = new Gson();
        //要写入的内容
        LinkedBlockingQueue<String[]> contents = new LinkedBlockingQueue<String[]>(40);
        new Thread(new Runnable() {
            @Override
            public void run() {
                String[] values = null;
                List<Float> arr = new ArrayList<>();
                for (int k = 0; k < 1000; k++) {
                    arr.clear();
                    values = new String[3];
                    values[0] = "a";
                    values[1] = (System.currentTimeMillis() + k) + "";
                    for (int j = 2; j < 15002; j++) {
                        arr.add(j * 1.234454f);
                    }
                    values[2] = gson.toJson(arr);
                    try {
                        contents.put(values);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        Thread task = new Thread(new Runnable() {
            @Override
            public void run() {
                Writer writer = null;
                try {
                    long s0 = System.currentTimeMillis();
                    System.out.println("load start---------------");
                    //输出ORC文件本地绝对路径
                    String lxw_orc1_file = "/home/hdfs/data/fly_param_" + Thread.currentThread().getId() + ".orc";
                    Configuration conf = new Configuration();
                    FileSystem.getLocal(conf);
                    writer = OrcFile.createWriter(
                            new Path(lxw_orc1_file),
                            OrcFile.writerOptions(conf)
                                    .setSchema(schema)
//                                    .stripeSize(1024*1024)
//                                    .bufferSize(1024)
//                                    .blockSize(1024)
                                    .compress(CompressionKind.LZ4)
                                    .version(OrcFile.Version.V_0_12)
                    );
                    VectorizedRowBatch batch = schema.createRowBatch();
                    while (true) {
                        String[] values = contents.take();
                        if (values == null || values.length < 1) {
                            System.out.println("params null");
                            continue;
                        }
                        int rowCount = batch.size++;

                        for (int i = 0; i < values.length; i++) {
                            ((BytesColumnVector) batch.cols[i]).setVal(rowCount, values[i].getBytes());
                            if (batch.size == batch.getMaxSize() && batch != null) {
                                writer.addRowBatch(batch);
                                batch.reset();
                            }
                        }
                        long s1 = System.currentTimeMillis();
                        writer.addRowBatch(batch);
                        long end = System.currentTimeMillis();
                        if (rowCount != 0 && rowCount % 100 == 0) {
                            System.out.println(Thread.currentThread() +
                                    " 累计耗时=>" + (end - s0) +
                                    " 单条耗时=>" + (end - s1) +
                                    " 队列大小=>" + contents.size() +
                                    " 累计加载=>" + (rowCount*5) +
                                    " 单条加载平均耗时=>" + ((end - s0) / (rowCount * 5)));
                        }

                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    try {
                        writer.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        });
        ExecutorService service = new ThreadPoolExecutor(5, 10, 10000000, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    executor.getQueue().offer(r, 10, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        for (int i = 0; i < 5; i++) {
            service.submit(task);
        }

    }

}